// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package notifier

import (
	"context"
	goerr "errors"
	"fmt"
	"strings"
	"time"

	"github.com/pingcap/errors"
	sess "github.com/pingcap/tidb/pkg/ddl/session"
	"github.com/pingcap/tidb/pkg/kv"
	"github.com/pingcap/tidb/pkg/owner"
	"github.com/pingcap/tidb/pkg/sessionctx"
	"github.com/pingcap/tidb/pkg/util"
	"github.com/pingcap/tidb/pkg/util/intest"
	"github.com/pingcap/tidb/pkg/util/logutil"
	"go.uber.org/zap"
)

// SchemaChangeHandler function is used by subscribers to handle the
// SchemaChangeEvent generated by the publisher (DDL module currently). It will
// be called at least once for every SchemaChange. The sctx has already started a
// pessimistic transaction and handler should execute exactly once SQL
// modification logic with it. After the function is returned, subscribing
// framework will commit the whole transaction with internal flag modification to
// provide exactly-once delivery. The handler will be called periodically, with
// no guarantee about the latency between the execution time and
// SchemaChangeEvent happening time.
//
// The handler function must be registered by RegisterHandler before the
// DDLNotifier is started. If the handler can't immediately serve the handling
// after registering, it can return nil to tell the DDLNotifier to act like the
// change has been handled, or return ErrNotReadyRetryLater to hold the change
// and re-handle later.
type SchemaChangeHandler func(
	ctx context.Context,
	sctx sessionctx.Context,
	change *SchemaChangeEvent,
) error

// ErrNotReadyRetryLater should be returned by a registered handler that is not
// ready to process the events.
var ErrNotReadyRetryLater = errors.New("not ready, retry later")

// HandlerID is the type of the persistent ID used to register a handler. Every
// ID occupies a bit in a BIGINT column, so at most we can only have 64 IDs. To
// avoid duplicate IDs, all IDs should be defined in below declaration.
type HandlerID int

const (
	// TestHandlerID is used for testing only.
	TestHandlerID HandlerID = 0
	// StatsMetaHandlerID is used to update statistics system table.
	StatsMetaHandlerID HandlerID = 1
	// PriorityQueueHandlerID is used to update the priority queue.
	PriorityQueueHandlerID HandlerID = 2
)

// String implements fmt.Stringer interface.
func (id HandlerID) String() string {
	switch id {
	case TestHandlerID:
		return "TestHandler"
	case StatsMetaHandlerID:
		return "StatsMetaHandler"
	default:
		return fmt.Sprintf("HandlerID(%d)", id)
	}
}

// Ensure DDLNotifier implements the owner.Listener interface.
// The DDLNotifier is started only when the stats owner is elected to ensure consistency.
// This design is crucial because:
//  1. The stats handler(priority queue) processes DDLNotifier events in memory.
//  2. Keeping the stats handler and DDLNotifier on the same node maintains data integrity.
//  3. It prevents potential race conditions or inconsistencies that could arise from
//     distributed processing of these events across multiple nodes.
var _ owner.Listener = (*DDLNotifier)(nil)

// DDLNotifier implements the subscription on DDL events.
type DDLNotifier struct {
	// The context is initialized in Start and canceled in Stop and Close.
	ctx            context.Context
	cancel         context.CancelFunc
	wg             util.WaitGroupWrapper
	sysSessionPool util.SessionPool

	store        Store
	handlers     map[HandlerID]SchemaChangeHandler
	pollInterval time.Duration

	// handlersBitMap is set to the full bitmap of all registered handlers in Start.
	handlersBitMap uint64
}

// NewDDLNotifier initializes the global DDLNotifier.
func NewDDLNotifier(
	sysSessionPool util.SessionPool,
	store Store,
	pollInterval time.Duration,
) *DDLNotifier {
	return &DDLNotifier{
		sysSessionPool: sysSessionPool,
		store:          store,
		handlers:       make(map[HandlerID]SchemaChangeHandler),
		pollInterval:   pollInterval,
	}
}

// RegisterHandler must be called with an exclusive and fixed HandlerID for each
// handler to register the handler. Illegal ID will panic. RegisterHandler should
// not be called after the global DDLNotifier is started.
//
// RegisterHandler is not concurrency-safe.
func (n *DDLNotifier) RegisterHandler(id HandlerID, handler SchemaChangeHandler) {
	intID := int(id)
	// the ID is used by bit operation in processedByFlag. We use BIGINT UNSIGNED to
	// store it so only 64 IDs are allowed.
	if intID < 0 || intID >= 64 {
		panic(fmt.Sprintf("illegal HandlerID: %d", id))
	}

	if _, ok := n.handlers[id]; ok {
		// In some tests, we register the same handler multiple times because we
		// create multiple stats handles in the same test.
		logutil.BgLogger().Error("HandlerID already registered", zap.Int("id", int(id)))
		return
	}
	n.handlers[id] = handler
}

// start starts the DDLNotifier. It will block until the context is canceled.
// Do not call this function directly. Use owner.Listener interface instead.
func (n *DDLNotifier) start() {
	for id := range n.handlers {
		n.handlersBitMap |= 1 << id
	}

	ctx := kv.WithInternalSourceType(n.ctx, kv.InternalDDLNotifier)
	ctx = logutil.WithCategory(ctx, "ddl-notifier")
	ticker := time.NewTicker(n.pollInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			if err := n.processEvents(ctx); err != nil {
				intest.Assert(
					errors.ErrorEqual(err, context.Canceled) ||
						strings.Contains(err.Error(), "mock handleTaskOnce error") ||
						strings.Contains(err.Error(), "session pool closed"),
					fmt.Sprintf("error processing events: %v", err),
				)
				logutil.Logger(ctx).Error("Error processing events", zap.Error(err))
			}
		}
	}
}

// ProcessEventsBatchSize is the number of events to process in a SQL query. It's
// exposed for testing.
var ProcessEventsBatchSize = 1024

func (n *DDLNotifier) processEvents(ctx context.Context) error {
	s, err := n.sysSessionPool.Get()
	if err != nil {
		return errors.Trace(err)
	}
	defer n.sysSessionPool.Put(s)
	sess4List := sess.NewSession(s.(sessionctx.Context))
	result, closeFn := n.store.List(ctx, sess4List)
	defer closeFn()

	s2, err := n.sysSessionPool.Get()
	if err != nil {
		return errors.Trace(err)
	}
	defer n.sysSessionPool.Put(s2)
	sess4Process := sess.NewSession(s2.(sessionctx.Context))

	// we should ensure deliver order of events to a handler, so if a handler returns
	// error for previous events it should not receive later events.
	skipHandlers := make(map[HandlerID]struct{})

	changes := make([]*SchemaChange, ProcessEventsBatchSize)

	for {
		count, err2 := result.Read(changes)
		if err2 != nil {
			return errors.Trace(err2)
		}
		if count == 0 {
			break
		}

		for _, change := range changes[:count] {
			for handlerID, handler := range n.handlers {
				if _, ok := skipHandlers[handlerID]; ok {
					continue
				}
				if err3 := n.processEventForHandler(ctx, sess4Process, change, handlerID, handler); err3 != nil {
					skipHandlers[handlerID] = struct{}{}

					if !goerr.Is(err3, ErrNotReadyRetryLater) {
						logutil.Logger(ctx).Error("Error processing change",
							zap.Int64("ddlJobID", change.ddlJobID),
							zap.Int64("subJobID", change.subJobID),
							zap.Stringer("handler", handlerID),
							zap.Error(err3))
					}
					continue
				}
			}

			if intest.InTest {
				if n.handlersBitMap == 0 {
					// There are unit tests that directly check the system table while no subscriber
					// is registered. We continue the loop to skip DELETE the events in table so
					// tests can check them.
					continue
				}
			}

			if change.processedByFlag == n.handlersBitMap {
				s3, err3 := n.sysSessionPool.Get()
				if err3 != nil {
					return errors.Trace(err3)
				}
				sess4Del := sess.NewSession(s3.(sessionctx.Context))
				err3 = n.store.DeleteAndCommit(
					ctx,
					sess4Del,
					change.ddlJobID,
					int(change.subJobID),
				)
				n.sysSessionPool.Put(s3)
				if err3 != nil {
					logutil.Logger(ctx).Error("Error deleting change",
						zap.Int64("ddlJobID", change.ddlJobID),
						zap.Int64("subJobID", change.subJobID),
						zap.Error(err3))
				}
			}
		}
	}

	return nil
}

const slowHandlerLogThreshold = time.Second * 5

func (n *DDLNotifier) processEventForHandler(
	ctx context.Context,
	session *sess.Session,
	change *SchemaChange,
	handlerID HandlerID,
	handler SchemaChangeHandler,
) (err error) {
	if (change.processedByFlag & (1 << handlerID)) != 0 {
		return nil
	}
	newFlag := change.processedByFlag | (1 << handlerID)

	if err = session.BeginPessimistic(ctx); err != nil {
		return errors.Trace(err)
	}
	defer func() {
		if err != nil {
			session.Rollback()
			return
		}

		err = errors.Trace(session.Commit(ctx))
		if err == nil {
			change.processedByFlag = newFlag
		}
	}()

	now := time.Now()
	if err = handler(ctx, session.Context, change.event); err != nil {
		return errors.Trace(err)
	}
	if time.Since(now) > slowHandlerLogThreshold {
		logutil.Logger(ctx).Warn("Slow process event",
			zap.Stringer("handler", handlerID),
			zap.Int64("ddlJobID", change.ddlJobID),
			zap.Int64("subJobID", change.subJobID),
			zap.Stringer("event", change.event),
			zap.Duration("duration", time.Since(now)))
	}

	return errors.Trace(n.store.UpdateProcessed(
		ctx,
		session,
		change.ddlJobID,
		change.subJobID,
		change.processedByFlag,
		newFlag,
	))
}

// Stop stops the background loop.
// Exposed for testing.
// Do not call this function directly. Use owner.Listener interface instead.
func (n *DDLNotifier) Stop() {
	// If the notifier is not started, the cancel function is nil.
	if n.cancel == nil {
		return
	}
	n.cancel()
	n.wg.Wait()
}

// OnBecomeOwner implements the owner.Listener interface.
// We need to make sure only one DDLNotifier is running at any time.
func (n *DDLNotifier) OnBecomeOwner() {
	n.ctx, n.cancel = context.WithCancel(context.Background())
	n.wg.RunWithRecover(n.start, func(r any) {
		if r == nil {
			return
		}
		// In unit tests, we want to panic directly to find the root cause.
		if intest.InTest {
			panic(r)
		}
		logutil.BgLogger().Error("panic in ddl notifier", zap.Any("recover", r), zap.Stack("stack"))
	})
}

// OnRetireOwner implements the owner.Listener interface.
// After the owner is retired, we need to stop the DDLNotifier.
func (n *DDLNotifier) OnRetireOwner() {
	n.Stop()
}
